const { Kafka } = require('kafkajs');

// 创建 Kafka 生产者
async function produce() {
    const kafka = new Kafka({
        clientId: 'producer',
        brokers: ['localhost:9092']
    });

    // 创建和连接生产者实例
    const producer = kafka.producer();
    await producer.connect();

    // 发送消息
    await producer.send({
        topic: 'my-topic',
        messages: [
            { value: 'Hello Kafka!' }
        ]
    });

    // 断开连接
    await producer.disconnect();
}

// 创建 Kafka 消费者
async function consume() {
    const kafka = new Kafka({
        clientId: 'consumer',
        brokers: ['localhost:9092']
    });

    // 创建和连接消费者实例
    const consumer = kafka.consumer({ groupId: 'my-group' });
    await consumer.connect();
    await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

    // 接收消息
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                key: message.key,
                value: message.value.toString(),
                headers: message.headers,
                timestamp: message.timestamp
            });
        }
    });
}

// 运行生产者和消费者
async function run() {
    await produce();
    // await consume();
}

run().catch(console.error);
